#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
#include <pthread.h>
#include <getopt.h>


#define DBG_SUBSYS S_LIBYNET

#include "ynet_rpc.h"
#include "sock_unix.h"
#include "corenet.h"
#include "rpc_proto.h"
#include "../../ynet/sock/sock_tcp.h"
#include "dbg.h"

//#define MAX_BUF_LEN 65536

#define TEST_NET_PORT  "12339"
#define MSG_RECV_SIZE 64
#define MSG_SEND_SIZE 4096

#if 1
#define USE_EPOLL
#endif

#define USE_CORENET 0
#define USE_SCHEDULE 0

typedef struct {
        sem_t sem;
        uint64_t io;
        int sd;
        int sendsize;
        int recvsize;
} prof_net_arg_t;

typedef enum {
        OP_SRV,
        OP_CLI,
} op_t;

typedef struct {
        int sd;
        int running;
        void *ptr;
} session_t;

static int __listen_sd__;
struct sockaddr_un __listen_addr__;
struct sockaddr_un __conn_addr__;
static int __epoll_fd__;

#if 1
#define USE_UNIX_DOMAIN
#define __LISTEN_PATH__ "/tmp/prof_net.socket"
#endif

#if 0

static void __corenet_net_exec(void *arg)
{
        int ret;
        session_t *session = arg;
        char tmp[MSG_SEND_SIZE];

        ret = write(session->sd, tmp, MSG_SEND_SIZE);
        if (ret == -1) {
                DWARN("session close\n");
        }
}

static void __corenet_newtask(session_t *session)
{
        schedule_task_new("corenet_net", __corenet_net_exec, session, -1);
}

#else

static void __corenet_net_exec(void *arg)
{
        int ret;
        rpc_request_t *rpc_request = arg;
        char buf[MAX_BUF_LEN];

        mbuffer_popmsg(&rpc_request->buf, buf, rpc_request->buf.len);

#if 0
        (void) ret;
        buffer_t _buf;
        mbuffer_init(&_buf, 0);
        mbuffer_appendzero(&_buf, MSG_SEND_SIZE);
        corenet_send(&rpc_request->sockid, &_buf, 0);
#else

        ret = write(rpc_request->sockid.sd, rpc_request->ctx, MSG_SEND_SIZE);
        if (ret == -1) {
                DWARN("session close\n");
        }
#endif

        mbuffer_free(&rpc_request->buf);
        mem_cache_free(MEM_CACHE_4K, rpc_request->ctx);
        mem_cache_free(MEM_CACHE_128, rpc_request);
}

static void __corenet_newtask(session_t *session)
{
        rpc_request_t *rpc_request;

        rpc_request = mem_cache_calloc(MEM_CACHE_128, 0);
        mbuffer_init(&rpc_request->buf, 0);
        mbuffer_appendmem(&rpc_request->buf, session->ptr, MSG_RECV_SIZE);
        rpc_request->sockid.sd = session->sd;
        rpc_request->sockid.type = SOCKID_CORENET;
        rpc_request->sockid.addr = 123;

        rpc_request->ctx = mem_cache_calloc(MEM_CACHE_4K, 0);

        schedule_task_new("corenet_net", __corenet_net_exec, rpc_request, -1);
}

#endif

static int __corenet_corenet_exec(void *ctx, void *buf, int *_count)
{
        int count;
        buffer_t *_buf = buf;
        char tmp[MSG_RECV_SIZE];

        count = 0;
        while (_buf->len >= MSG_RECV_SIZE) {
                mbuffer_popmsg(_buf, tmp, MSG_RECV_SIZE);

                __corenet_newtask(ctx);
                count++;
        }

        *_count = count;

        return 0;
}

static void __corenet_reset(void *arg)
{
        session_t *session = arg;

        session->running = 0;
}

inline static void *__corenet_srv_wroker(void *arg)
{
        int ret;
        session_t *session = arg;
        char out[MAX_BUF_LEN];
        sockid_t sockid;

        DINFO("work start\n");

        session->running = 1;

        ret = corenet_tcp_init(32768, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        sockid.sd = session->sd;
        sockid.type = SOCKID_CORENET;
        sockid.addr = 123;
        session->ptr = out;
        ret = corenet_tcp_add(NULL, &sockid, session, __corenet_corenet_exec,
                              __corenet_reset, NULL, NULL, "perf");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = schedule_create(NULL, "corenet_net", NULL, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        while (session->running) {
                ret = corenet_tcp_poll(gloconf.polling_timeout);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                schedule_run(NULL);

                corenet_tcp_commit();
        }

        DINFO("work exit\n");
        return NULL;
err_ret:
        DINFO("work exit\n");

        close(session->sd);
        yfree((void **)&session);
        return NULL;
}

#if 0

static void __schedule_net_exec(void *arg)
{
        int ret;
        session_t *session = arg;
        char tmp[MSG_SEND_SIZE];

        ret = write(session->sd, tmp, MSG_SEND_SIZE);
        if (ret == -1) {
                DWARN("session close\n");
        }
}

static void __schedule_newtask(session_t *session)
{
        schedule_task_new("schedule_net", __schedule_net_exec, session, -1);
}

#else

static void __schedule_net_exec(void *arg)
{
        int ret;
        rpc_request_t *rpc_request = arg;
        char buf[MAX_BUF_LEN], tmp[MSG_SEND_SIZE];

        mbuffer_popmsg(&rpc_request->buf, buf, rpc_request->buf.len);

        ret = write(rpc_request->sockid.sd, tmp, MSG_SEND_SIZE);
        if (ret == -1) {
                DWARN("session close\n");
        }

        mbuffer_free(&rpc_request->buf);
        mem_cache_free(MEM_CACHE_128, rpc_request);
}

static void __schedule_newtask(session_t *session)
{
        rpc_request_t *rpc_request;

        rpc_request = mem_cache_calloc(MEM_CACHE_128, 0);
        mbuffer_init(&rpc_request->buf, 0);
        mbuffer_appendmem(&rpc_request->buf, session->ptr, MSG_RECV_SIZE);
        rpc_request->sockid.sd = session->sd;

        schedule_task_new("schedule_net", __schedule_net_exec, rpc_request, -1);
}

#endif

inline static int __schedule_recv(int sd, buffer_t *_buf)
{
        int ret, iov_count, toread;
        struct msghdr msg;
        buffer_t buf;
        struct iovec iov[100];

        ret = ioctl(sd, FIONREAD, &toread);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (toread == 0) {
                ret = ECONNRESET;
                GOTO(err_ret, ret);
        }

        ret = mbuffer_init(&buf, toread);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(buf.len <= CORE_IOV_MAX * BUFFER_SEG_SIZE);

        iov_count = CORE_IOV_MAX;
        ret = mbuffer_trans(iov, &iov_count,  &buf);
        YASSERT(iov_count < 100);
        YASSERT(ret == (int)buf.len);
        memset(&msg, 0x0, sizeof(msg));
        msg.msg_iov = iov;
        msg.msg_iovlen = iov_count;

        DBUG("read data %u\n", toread);

        ret = _recvmsg(sd, &msg, MSG_DONTWAIT);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_free, ret);
        }

        YASSERT(ret == (int)buf.len);
        mbuffer_merge(_buf, &buf);
        DBUG("new recv %u, left %u\n", buf.len, _buf->len);

        return 0;
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

inline static void *__schedule_srv_wroker(void *arg)
{
        int ret;
        session_t *session = arg;
        char in[MAX_BUF_LEN];
        struct pollfd pfd;

        DINFO("work start\n");

        pfd.fd = session->sd;
        pfd.events = POLLIN;
        pfd.revents = 0;

        (void) pfd;

        ret = schedule_create(NULL, "schedule_net", NULL, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        while (1) {
#if 1
                ret = poll(&pfd, 1, 0);
                if (ret == -1) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }

                if (pfd.revents == 0)
                        continue;
#endif

#if 0
                ret = read(session->sd, in, MSG_RECV_SIZE);
                if (ret == -1) {
                        ret = errno;
                        if (ret == EAGAIN) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

#else
                buffer_t buf;
                mbuffer_init(&buf, 0);
                ret = __schedule_recv(session->sd, &buf);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                mbuffer_free(&buf);
#endif

                session->ptr  = in;
                __schedule_newtask(session);
                //DBUG("new task, len %u\n", head.len + head.blocks);
                schedule_run(NULL);
        }

        DINFO("work exit\n");
        return NULL;
err_ret:
        DINFO("work exit\n");

        close(session->sd);
        yfree((void **)&session);
        return NULL;
}

inline static void *__test_srv_wroker(void *arg)
{
        int ret;
        session_t *session = arg;
        char in[MAX_BUF_LEN], out[MAX_BUF_LEN];
        struct pollfd pfd;

        DINFO("work start\n");

        pfd.fd = session->sd;
        pfd.events = POLLIN;
        pfd.revents = 0;

        (void) pfd;

        while (1) {
#if 1
                ret = poll(&pfd, 1, 0);
                if (ret == -1) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }

                if (pfd.revents == 0)
                        continue;
#endif

#if 1
                int toread;
                ret = ioctl(session->sd, FIONREAD, &toread);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
#endif
                
                ret = read(session->sd, in, MSG_RECV_SIZE);
                if (ret == -1) {
                        ret = errno;
                        if (ret == EAGAIN) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                if (ret == 0) {
                        ret = ECONNRESET;
                        GOTO(err_ret, ret);
                }

                ret = write(session->sd, out, MSG_SEND_SIZE);
                if (ret == -1) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
        }

        DINFO("work exit\n");
        return NULL;
err_ret:
        DINFO("work exit\n");

        close(session->sd);
        yfree((void **)&session);
        return NULL;
}

static int __test_srv_accept(int srv_sd)
{
        int ret, sd;
        struct sockaddr_in sin;
        socklen_t alen;
        pthread_t th;
        pthread_attr_t ta;
        session_t *session;

#ifdef USE_UNIX_DOMAIN
        _memset(&sin, 0, sizeof(sin));
        alen = sizeof(struct sockaddr_un);

        sd = accept(srv_sd, &sin, &alen);
        if (sd < 0 ) {
                ret = errno;
		GOTO(err_ret, ret);
        }

        ret = sock_unix_tuning(sd);
        if (unlikely(ret))
                GOTO(err_sd, ret);

#if 0
        ret = sock_setblock(sd);
        if (unlikely(ret)) {
                DERROR("%d - %s\n", ret, strerror(ret));
                GOTO(err_ret, ret);
        }
#endif
#else        
        _memset(&sin, 0, sizeof(sin));
        alen = sizeof(struct sockaddr_in);

        sd = accept(srv_sd, &sin, &alen);
        if (sd < 0 ) {
                ret = errno;
		GOTO(err_ret, ret);
        }

        ret = tcp_sock_tuning(sd, 1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        ret = sock_setblock(sd);
        if (unlikely(ret)) {
                DERROR("%d - %s\n", ret, strerror(ret));
                GOTO(err_ret, ret);
        }
#endif
#endif

        DINFO("accept new sd %u\n", sd);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = ymalloc((void **)&session, sizeof(*session));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        session->sd = sd;

#if USE_CORENET
        ret = pthread_create(&th, &ta, __corenet_srv_wroker, session);
        if (unlikely(ret))
                GOTO(err_sd, ret);

#elif USE_SCHEDULE
        ret = pthread_create(&th, &ta, __schedule_srv_wroker, session);
        if (unlikely(ret))
                GOTO(err_sd, ret);
#else
        ret = pthread_create(&th, &ta, __test_srv_wroker, session);
        if (unlikely(ret))
                GOTO(err_sd, ret);

#endif

        return 0;
err_sd:
        (void) close(sd);
err_ret:
        return ret;
}

static void *__prof_srv(void *arg)
{
        int ret;
        (void) arg;

        while (1) {
                ret = __test_srv_accept(__listen_sd__);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return NULL;
}

int prof_net_init()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

#ifdef USE_EPOLL
        __epoll_fd__ = epoll_create(1024);
        if (__epoll_fd__ < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        DINFO("use epoll\n");
#else
        __epoll_fd__ = -1;

        DINFO("use poll\n");
        //DINFO("epoll fd %u\n", __epoll_fd__);
#endif

#ifdef USE_UNIX_DOMAIN
        ret = sock_unix_listen(__LISTEN_PATH__, &__listen_sd__, &__listen_addr__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("listen @ %s, sd %u\n", __LISTEN_PATH__, __listen_sd__);
#else

        //ret = net_hostconnect(&proxysd, host, "10090", YNET_RPC_BLOCK);
        ret = tcp_sock_hostlisten(&__listen_sd__, NULL, TEST_NET_PORT,
                                  YNET_QLEN, YNET_RPC_BLOCK, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
#endif

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __prof_srv, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}


static void *__test_cli_worker(void *_arg)
{
        int ret, pipe_in[2], pipe_out[2];
        prof_net_arg_t *arg = _arg;
        char *in = NULL, *out = NULL;
        struct pollfd pfd;

#if 1
        ret = posix_memalign((void **)&in, 4096, MAX_BUF_LEN);
        if (ret < 0) {
                ret = ENOMEM;
                UNIMPLEMENTED(__DUMP__);
        }

        ret = posix_memalign((void **)&out, 4096, MAX_BUF_LEN);
        if (ret < 0) {
                ret = ENOMEM;
                UNIMPLEMENTED(__DUMP__);
        }
#endif

        ret = pipe(pipe_in);
        if (ret < 0) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

        ret = pipe(pipe_out);
        if (ret < 0) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

        while (1) {
                strcpy(in, "xxxxxxxxx");
#ifndef USE_SPLICE
                ret = send(arg->sd, in, MSG_RECV_SIZE, MSG_DONTWAIT);
                if (ret < 0) {
                        ret = errno;
                        UNIMPLEMENTED(__DUMP__);
                }
#else
                ret = __splice_out(arg->sd, pipe_out, out, MSG_RECV_SIZE);
                if (ret < 0) {
                        ret = -ret;
                        UNIMPLEMENTED(__DUMP__);
                }
#endif

                YASSERT(ret == MSG_RECV_SIZE);

                pfd.fd = arg->sd;
                pfd.events = POLLIN;
                pfd.revents = 0;

                (void) pfd;
                while (1) {
#if 1
                        ret = poll(&pfd, 1, 0);
                        if (ret == -1) {
                                ret = errno;
                                UNIMPLEMENTED(__DUMP__);
                        }

                        if (pfd.revents == 0)
                                continue;
#endif

                
#ifndef USE_SPLICE
                        ret = recv(arg->sd, out, MSG_SEND_SIZE, 0);
                        if (ret < 0) {
                                ret = errno;
                                if (ret == EAGAIN) {
                                        continue;
                                } else
                                        UNIMPLEMENTED(__DUMP__);
                        }
#else
                        ret = __splice_in(arg->sd, pipe_out, out, MSG_SEND_SIZE);
                        if (ret < 0) {
                                ret = -ret;
                                UNIMPLEMENTED(__DUMP__);
                        }
#endif

                        YASSERT(ret == MSG_SEND_SIZE);

                        arg->io++;

                        break;
                }
        }

        return NULL;
}

static void __test_cli_print(prof_net_arg_t *args, int threads, struct timeval *begin)
{
        int i;
        uint64_t io, used;
        struct timeval now;

        io = 0;
        for (i = 0; i < threads; i++) {
                io += args[i].io;
        }

        _gettimeofday(&now, NULL);

        used = _time_used(begin, &now);

        DBUG("io %u, used %u\n", (int)io, (int)used);
        printf("iops : %llu\n", (long long)io * 1000 * 1000 /  used);
}

int prof_net(const char *srv, int threads, int runtime)
{
        int ret, i, sd;
        net_handle_t nh;
        prof_net_arg_t args[1024], *arg;
        pthread_t th;
        pthread_attr_t ta;

        ret = conf_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        for (i = 0; i < threads; i++) {
                arg = &args[i];
                arg->io = 0;

#ifdef USE_UNIX_DOMAIN
                (void) srv;
                (void) nh;

                ret =  sock_unix_connect(__LISTEN_PATH__, &sd, &__conn_addr__);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = sock_unix_tuning(sd);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = sock_setblock(sd);
                if (unlikely(ret)) {
                        DERROR("%d - %s\n", ret, strerror(ret));
                        GOTO(err_ret, ret);
                }

                arg->sd = sd;
#else
                
                ret = tcp_sock_hostconnect(&nh, srv, TEST_NET_PORT, 0, 10, 0);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                arg->sd = nh.u.sd.sd;

                ret = tcp_sock_tuning(arg->sd, 1, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
#endif

                ret = pthread_create(&th, &ta, __test_cli_worker, arg);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        struct timeval begin;
        _gettimeofday(&begin, NULL);

        for (i = 0; i < runtime; i++) {
                sleep(1);
                __test_cli_print(args, threads, &begin);
        }

        return 0;
err_ret:
        return ret;
}
